<?php

$port = 8080;
$socket = @stream_socket_server("tcp://0.0.0.0:$port", $errno, $errMsg);
if ($socket === false) {
    throw new \RuntimeException("fail to listen on port: {$port}!");
}
fwrite(STDOUT, "socket server listen on port: {$port}" . PHP_EOL);
stream_set_blocking($socket, false);

$clients = $changed = $clientInfo = $cartCheck = $multiData = $apps = array();

$keepRun = $new = $newMain = $newBypass = true;
$input =  false;
$mainProcessId = 0;

do {
    if (true == $newBypass) {
        // 协程 / 入口
        $bypass = coroutineProcess();
        $newBypass = false;
    }
    $input = $bypass->current();
    // 这里要判断是 协程返回，还是新入口
    if (true == $newMain) {
        // 主程序
        $app = mainProcess();
        $newMain = false;
    }

    // 开始运行
    if (-1 === $input) {
        // 这个流程是结束的, 那几进入下一个循环
        $newBypass = true;
        continue;
    } elseif (false === $input || NULL === $input) {
        // 协程返回了数据，不过是空
        $newBypass = true;
        continue;
    } elseif (is_array($input)) {
        // 有数据，且有效
        // 如果是新请求数据
        if ('request' == $input['type']) {
            $app = mainProcess();
            $mainProcessId ++;
            fwrite(STDOUT, "Create a new main process, Id: " . $mainProcessId . PHP_EOL);
            $apps[$mainProcessId] = $app;
            $output = $app->current();
            $app->send($input['data']);
        } elseif ('response' == $input['type']) {
            // 如果是响应数据, 那么是响应哪个主流程的呢？
            $goToId = $input['processId'];
            $app = $apps[$goToId];
            fwrite(STDOUT, "Go back main process Id: " . $goToId . PHP_EOL);
            $output = $app->current();
            $app->send($input);
//        $app->next();
        }
    } else {
        // loop
//        $app->current();
    }
    $bypass->next();
    $re = $bypass->getReturn();
} while ($keepRun);


/**
 * 业务主"进程"
 *
 * @return bool|Generator
 */
function mainProcess()
{
    global $clients, $clientInfo;
    fwrite(STDOUT, "start main process, time" . date('Y-m-d H:i:s', time()) . "\n");
    // 接收请求数据
    $json = yield;
//    $json = json_decode($data, true);
    $client = false;

    $noBlocking = boolval($json['noBlocking']);
    $noBlocking = true;
    $productId = $json['data']['productId'];
    fwrite(STDOUT, "start check cart , product_id: $productId, time" . date('Y-m-d H:i:s', time()) . "\n");

    // 非阻塞方式调用其他服务
    $cartCheck[$productId] = array(
        'requestSocket' => $client,            // 这里记录发起亲求的socket
    );

    $inventorySocket = $productSocket = $promoSocket = null;
    $requestSuc = checkInventory($productId, $inventorySocket, $noBlocking);
    if (!$requestSuc) {
        return false;
    }
    $clients[] = $inventorySocket;

    $requestSuc = checkProduct($productId, $productSocket, $noBlocking);
    if (!$requestSuc) {
        return false;
    }
    $clients[] = $productSocket;

    $requestSuc = checkPromo($productId, $promoSocket, $noBlocking);
    if (!$requestSuc) {
        return false;
    }
    $clients[] = $promoSocket;

    // 这就完成了 3个检查请求
    fwrite(STDOUT, "completely send check requests, product_id: $productId, time" . date('Y-m-d H:i:s', time()) . "\n");

    $data[] = yield;
    $data[] = yield;
    $data[] = yield;

    var_export($data);

    return true;
}

/**
 * 消息 listener
 * @return bool
 */
function coroutineProcess()
{
    checkMessage();
    fwrite(STDOUT, "\nnew read message\n");
    accept();

    $bypassRun = true;
    do {
        $bypass = handleMessage();
        $re = $bypass->current();
        if (-1 === $re) {
            yield -1;
        } elseif (NULL === $re) {
            yield $re;
        } elseif ($re) {
            yield $re;
        }

        $bypassRun = false;
//        $bypass->next();
    } while ($bypassRun);

    return true;
}

/**
 * 接收当前所有 socket 的可读事件
 */
function checkMessage() {
    global $socket, $changed, $clients, $clientInfo;
    $changed = array_merge([$socket], $clients);
    $write = null;
    $except = null;
    stream_select($changed, $write, $except, null);
}

/**
 * 接收新的请求
 */
function accept() {
    global $socket, $changed, $clients, $clientInfo;
    if (!in_array($socket, $changed)) {
        return;
    }

    while ($client = @stream_socket_accept($socket, 0)) {
        $clients[] = $client;
        $resourceStr = strval($client);
        $clientInfo[$resourceStr] = array('type' => 'request', 'resource' => $client);


        fwrite(STDOUT, "client:" . (int)$client . " connected. " . date('Y-m-d H:i:s') . "\n");
        stream_set_blocking($client, false);

        $key = array_search($client, $changed);
        unset($changed[$key]);
    }
}

/**
 *
 * @param      $host
 * @param      $port
 * @param      $method
 * @param      $data
 * @param      $socket
 * @param bool $noBlocking
 * @return bool|false|string
 */
function checkClient($host, $port, $method, $data, &$socket, $noBlocking = true)
{
    global $clientInfo, $mainProcessId;
    fwrite(STDOUT, 'request other client: ' . json_encode(func_get_args()) . "\n");

    $socket = @stream_socket_client("tcp://{$host}:{$port}", $errno, $errMsg);
    if ($socket === false) {
        throw new \RuntimeException("unable to create socket: " . $errMsg);
    }
    fwrite(STDOUT, "\nconnect to server: [{$host}:{$port}]...\n");
    $socketStr = strval($socket);
    $clientInfo[$socketStr] = array('type' => 'response', 'resource' => $socket, 'processId' => $mainProcessId);

    $message = json_encode([
        "method" => $method,
        "data" => $data,
    ]);

    fwrite(STDOUT, "send to server: $message\n");
    $len = @fwrite($socket, $message);
    if ($len === 0) {
        fwrite(STDOUT, "socket closed\n");
        return false;
    }

    if ($noBlocking) {
        // 非阻塞方式, 不等结果，之间返回调用成功
        return true;
    } else {
        // 阻塞方式，等待响应，读取后返回
        $msg = @fread($socket, 4096);
        if ($msg) {
            fwrite(STDOUT, "receive server: $msg\n");
            // 获取到返回结果后，再返回
            return json_decode($msg, true);
        } elseif (feof($socket)) {
            fwrite(STDOUT, "socket closed\n");
            return false;
        }
        return true;
    }
}

/**
 * 发起请求
 *
 * @param integer   $productId   产品ID
 * @param resource  $socket     套接字
 * @param bool      $noBlocking      阻塞与否
 *
 * @return bool|false|string
 */
function checkInventory($productId, &$socket, $noBlocking = true)
{
    // client.php
    $host = "127.0.0.1";
    $port = 8081;

    $data = array('productId' => $productId);

    return checkClient($host, $port, 'inventory', $data, $socket, $noBlocking);
}

/**
 * 检查产品可售
 *
 * @param      $productId
 * @param      $socket
 * @param bool $noBlocking
 * @return bool|false|string
 */
function checkProduct($productId, &$socket, $noBlocking = true)
{
    // client.php
    $host = "127.0.0.1";
    $port = 8082;

    $data = array('productId' => $productId);

    return checkClient($host, $port, 'product', $data, $socket, $noBlocking);
}

/**
 * 检查促销信息
 *
 * @param      $productId
 * @param      $socket
 * @param bool $noBlocking
 * @return bool|false|string
 */
function checkPromo($productId, &$socket, $noBlocking = true)
{
    // client.php
    $host = "127.0.0.1";
    $port = 8083;

    $data = array('productId' => $productId);

    return checkClient($host, $port, 'promo', $data, $socket, $noBlocking);
}

/**
 * 三项检查是否已完成
 *
 * @param $oneCartCheck
 * @param $reProductId
 */
function checkAllComplete($oneCartCheck, $reProductId)
{
    global $cartCheck, $clients, $changed;
    if (isset($oneCartCheck['inventory']) && isset($oneCartCheck['product']) && isset($oneCartCheck['promo'])) {
        // 这里定义了一个cartCheck 全局变量，减产完毕，就会被设置为 true。三项为true就表示检查完毕
        $checkRe = $oneCartCheck['inventory'] && ($oneCartCheck['product']) && ($oneCartCheck['promo']);
        $requestSocket = $oneCartCheck['requestSocket'];
        $requestKey =  array_search($requestSocket, $clients);
//        $requestSocket = $clients[$requestKey];
        $reData = array('method' => 'cart', 'data' => array('product_id' => $reProductId), 're' => $checkRe, 'msg' => 'suc');
        $msg = json_encode($reData);
        fwrite($requestSocket, "response " . (int)$requestSocket . ": $msg");
        fclose($requestSocket);
        unset($clients[$requestKey], $cartCheck[$reProductId]);
        fwrite(STDOUT, "complete response " . (int)$requestSocket . ", time " . date('Y-m-d H:i:s'));
    }
}

/**
 * 响应来自客户加购请求
 *
 * @param $socket
 * @param $method
 * @param $productId
 * @param $re
 * @return bool
 */
function responseClient($socket, $productId, $re)
{
    global $clients, $clientInfo;
    $reMsg = array('method' => 'cart', 'data' => array('product_id' => $productId), 're' => $re, 'msg' => 'suc');
    $data = json_encode($reMsg);
    // 这里响应了，发起 加购 请求的客户端，
    fwrite($socket, $data);
    $key = array_search($socket, $clients);
    // 关闭连接，
    fclose($socket);
    // 响应客户端后，就把它的socket从 select 去移除
    unset($clients[$key]);
    $clientStr = strval($socket);
    unset($clientInfo[$clientStr]);
    return true;
}

/**
 * 通过有可读事件的 socket client id来判断，发起client 来自哪条主"进程"
 *
 * @param $client
 * @return mixed
 * @throws Exception
 */
function getProcessIdBySocket($client)
{
    global $clientInfo;
    $clientStr = strval($client);
    if (isset($clientInfo[$clientStr])) {
        return $clientInfo[$clientStr]['processId'];
    } else {
        throw new \Exception('unknown socket client!');
    }
}

/**
 * 可读消息处理
 */
function handleMessage() {
    global $changed, $clients, $cartCheck, $clientInfo, $mainProcessId;
    foreach ($changed as $key => $client) {
        while (true) {

            $msg = @fread($client, 1024);
//            $msg = 1;
            if ($msg) {
                fwrite(STDOUT, "receive client " . (int)$client . " message: " . substr ($msg, 0, 20) . date('Y-m-d H:i:s', time()) . "\n");

                $clientStr = strval($client);
                $info = $clientInfo[$clientStr];
                // socket 有可读事件，还需呀判断是新请求，还是 来自客户端的响应？
                if ('request' == $info['type']) {
                    // 新请求
                    $json = json_decode($msg, true);

                    yield array('data' => $json, 'type' => 'request', 'client' => $clientStr);
                } elseif ('response' == $info['type']) {
                    // 自身发起的请求，有响应了
                    $json = json_decode($msg, true);

                    // 这个请求响应 需要投递给哪位发起请求的主呢？
                    $theProcessId = getProcessIdBySocket($client);
                    yield array('data' => $json, 'type' => 'response', 'client' => $clientStr, 'processId' => $theProcessId);
                }

            } else {
                if (feof($client)) {
//                    fwrite(STDOUT, "\nclient " . (int)$client . " closed.\n");
//                    fclose($client);
//                    $key = array_search($client, $clients);
//                    unset($clients[$key]);
                    sleep(1);
                }
                break;
            }
        }
    }
    yield -1;
}

